agentmux_srv\backend\wshutil/
rpcio.rs1#![allow(dead_code)]
2use std::io::{BufRead, BufReader, Read, Write};
15use tokio::sync::mpsc;
16use super::osc::encode_wave_osc_bytes;
17
18pub fn adapt_stream_to_msg_ch(
22 input: impl Read + Send + 'static,
23 output: mpsc::Sender<Vec<u8>>,
24) -> std::thread::JoinHandle<Result<(), String>> {
25 std::thread::spawn(move || {
26 let reader = BufReader::new(input);
27 for line in reader.lines() {
28 let line = line.map_err(|e| format!("read error: {}", e))?;
29 let trimmed = line.trim().to_string();
30 if trimmed.is_empty() {
31 continue;
32 }
33 output
34 .blocking_send(trimmed.into_bytes())
35 .map_err(|e| format!("channel send error: {}", e))?;
36 }
37 Ok(())
38 })
39}
40
41pub async fn adapt_output_ch_to_stream(
45 mut output_ch: mpsc::Receiver<Vec<u8>>,
46 mut output: impl Write,
47) -> Result<(), String> {
48 while let Some(msg) = output_ch.recv().await {
49 output
50 .write_all(&msg)
51 .map_err(|e| format!("write error: {}", e))?;
52 output
53 .write_all(b"\n")
54 .map_err(|e| format!("newline write error: {}", e))?;
55 output.flush().map_err(|e| format!("flush error: {}", e))?;
56 }
57 Ok(())
58}
59
60pub async fn adapt_msg_ch_to_pty(
64 mut output_ch: mpsc::Receiver<Vec<u8>>,
65 osc_esc: &str,
66 mut output: impl Write,
67) -> Result<(), String> {
68 if osc_esc.len() != 5 {
69 return Err("osc_esc must be 5 characters".to_string());
70 }
71 while let Some(msg) = output_ch.recv().await {
72 let encoded =
73 encode_wave_osc_bytes(osc_esc, &msg)?;
74 output
75 .write_all(&encoded)
76 .map_err(|e| format!("write error: {}", e))?;
77 output.flush().map_err(|e| format!("flush error: {}", e))?;
78 }
79 Ok(())
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85 use std::io::Cursor;
86
87 #[tokio::test]
88 async fn test_adapt_stream_to_msg_ch() {
89 let data = b"{\"cmd\":\"test\"}\n{\"cmd\":\"hello\"}\n";
90 let (tx, mut rx) = mpsc::channel(10);
91
92 let handle = adapt_stream_to_msg_ch(Cursor::new(data.to_vec()), tx);
93
94 let msg1 = rx.recv().await.unwrap();
95 assert_eq!(String::from_utf8(msg1).unwrap(), "{\"cmd\":\"test\"}");
96
97 let msg2 = rx.recv().await.unwrap();
98 assert_eq!(String::from_utf8(msg2).unwrap(), "{\"cmd\":\"hello\"}");
99
100 handle.join().unwrap().unwrap();
101 }
102
103 #[tokio::test]
104 async fn test_adapt_output_ch_to_stream() {
105 let (tx, rx) = mpsc::channel(10);
106 let mut output = Vec::new();
107
108 tx.send(b"{\"result\":\"ok\"}".to_vec()).await.unwrap();
109 drop(tx); adapt_output_ch_to_stream(rx, &mut output).await.unwrap();
112 assert_eq!(String::from_utf8(output).unwrap(), "{\"result\":\"ok\"}\n");
113 }
114}